\section{Consumer Service}\label{sec:Consumer}\index{consumer}
\subsection{Description}

Consumer resources are created by the Consumer Service at the request
of a user who wishes to query one or more tables in the virtual
database. Each Consumer represents a single SQL SELECT query. It runs
the query on the user's behalf by contacting all of the producers
necessary to answer it, and collects the results into an internal
tuple buffer from which the user can
subsequently retrieve them. Tuples are always \textit{pushed} by
producer services to a \textit{streaming server} running in each
Consumer Service.

The Consumer Service is responsible for authenticating all users and services
that connect to it, and for authorizing all operations, as specified in
chapter \ref{sec:Security}.

The mediator\index{mediator} is the component within the consumer which handles its interface
to registry instances and maintains the consumer's query plan. All communication 
between the consumer and the registry is through the mediator. 

\subsection{Interface}

\subsubsection{User Interface}

\begin{method}{createConsumer}

\inpar{xsd:string select}{Consumer's SQL SELECT query.}

\inpar{xsd:string queryType}{continuous, history, latest or static.}
\inpar{xsd:int(0..1) timeIntervalSec}{Optional time interval in seconds to go
back in time for the query.}
\inpar{xsd:int(0..1) timeoutSec}{Optional query timeout in seconds.}
\inpar{xsd:string(0..*) producerConnections}{Optional list of producer endpoints
to contact. Each endpoint is encoded as the string representation of an integer producer
resource, then a single space followed by the producer resource URL.} 
\outhead{Tuple(1..1)}{}
\outpar{xsd:int connectionId}{connectionId of new Consumer resource.} 
\desc
Creates a new Consumer resource and returns its endpoint. The query is not
started until the user calls \textit{start} or \textit{startDirected}. The query
is validated by calling \textit{getFullTableDetails} in the appropriate schema
for each table in the query. The subset of SQL supported by R-GMA for queries
is specified in detail in \ref{sec:SQLSelect}: both tables and views (see
\ref{sec:SecurityViews}) can be queried. All table and view names must have
explicit virtual database name prefixes (separated from them by a dot).
The mediator places further restrictions
on the complexity of queries supported by each query type. The query type and 
interval are described in
\ref{sec:BackgroundQueryTypes}; the interval is relative to the creation time of
the consumer resource in the service (an absolute start time is passed to the
producer service). The termination interval is described in
\ref{sec:ConsumerCreating}. Mediates and starts the Consumer's query, as described in
\ref{sec:ConsumerStarting} below. The query will be aborted by the consumer
service if it is still executing when the timeout is reached. The query will
also be aborted if the consumer's tuple buffer fills up. Does not wait for
the query to finish before returning. Queries can only be started once. 

If there is a user-supplied list of producers this is used instead of obtaining 
one from the mediator. All listed producers will be contacted. It is the 
responsibility of the user to ensure the list of producers they provide will be 
able to answer the query correctly.
\end{method}

\begin{method}[consumer]{abort}			
\inpar{xsd:int connectionId}{Consumer resource identifier.}
\OK
\desc Aborts a running query as described in \ref{sec:ConsumerStarting} below.
Returns when the query has aborted. Any tuples already held in the consumer
may still be retrieved by calling \textit{pop}. Does \textit{not} unregister
the consumer.
\end{method}
				
\begin{method}{hasAborted}
\inpar{xsd:int connectionId}{Consumer resource identifier.}
\outhead{Tuple(1..1)}{}
\outpar{xsd:boolean hasAborted}{True if user has aborted the query, or the query
timed out.} 
\desc Checks if the user has aborted a running query, or the query has timed
out. Query must have already been started by calling \textit{start} or
\textit{startDirected}.
\end{method}

\begin{method}{pop}
\inpar{xsd:int connectionId}{Consumer resource identifier.}
\inpar{xsd:int maxCount}{Maximum number of tuples to pop.}
\outhead{Tuple(0..*)}{The results of the query. The fields returned depend
upon the query.}
\desc Retrieves at most \textit{maxCount} tuples from the Consumer resource. Can
be
called at any point after a query is first started (even after \textit{abort}).
Tuples are returned in a result set with column metadata. Result set will be empty if no tuples are available
at the time \textit{pop} is called. An \textit{endOfResults} flag is attached to the
final result set, i.e. when the query has finished executing (or has aborted)
and \textit{pop} has returned all available tuples. The mediator will attach a
warning to a result set if it cannot guarantee the completeness of the results.\\
\end{method}

See also the common resource management service operations:
close~\ref{op:close} and 
destroy~\ref{op:destroy}.

See also the operations common to all services:
getProperty~\ref{op:getProperty}.

\subsubsection{System Interface}

\begin{method}{addProducer}
\inpar{xsd:int connectionId}{Consumer resource identifier.}
\inpar{xsd:string url}{Producer URL.}
\inpar{xsd:int id}{Producer resource identifier.}
\inpar{xsd:string tableName}{Table name}
\inpar{xsd:string predicate}{Predicate}
\inpar{xsd:int hrpSec}{History Retention Period in seconds}
\inpar{xsd:boolean isHistory} {If producer supports history queries}
\inpar{xsd:boolean isLatest}{If producer supports latest queries}
\inpar{xsd:boolean isContinuous}{If producer supports continuous queries}
\inpar{xsd:boolean isStatic}{If producer supports static queries}
\inpar{xsd:boolean isSecondaryProducer}{If producer is secondary}
\inpar{xsd:string qosAttrib}{The QOS attribute - not used currently}
\OK
\desc Sent by a producer service to a continuous consumer to notify
it about an
addition to the list of relevant producers in the registry. Ignored if the
query is not currently executing. See the \textit{plan maintenance} section
(\ref{sec:ConsumerPlanMaintenance}) for how the consumer should react to this.
\end{method}

\begin{method}{removeProducer}
\inpar{xsd:int connectionId}{Consumer resource identifier.}
\inpar{xsd:string url}{Producer URL.}
\inpar{xsd:int id}{Producer resource identifier.}
\OK
\end{method}

See also the common resource management service operation:
ping~\ref{op:resourceping}.

\subsection{Details}
\subsubsection{Creating and destroying Consumers}\label{sec:ConsumerCreating}

A new Consumer Resource is created when a user calls the Consumer 
Service's \textit{createConsumer} operation and is destroyed when the user calls
the \textit{close} or \textit{destroy} operations. In addition, if the service does
not hear from the user for a period exceeding the \textit{termination interval},
the service will initiate a \textit{close} operation on the resource. A call to
any user operation on the resource is sufficient to keep it alive.
To use the virtual
database properly, the user should normally run a mediated query. The query is
run on the user's behalf by the consumer service, and the user should
call \textit{pop} repeatedly until it returns a result
set\index{result set} containing an \textit{endOfResults} flag. If
there are no results available in the Consumer resource's buffer at
the time at which \textit{pop} is called, an empty result set will be
returned, but more tuples may become available if the query is still
executing.  If the user wants to abort a query, they can call
\textit{abort}. A timeout may be specified when a consumer is created in which
case the query will
be automatically aborted if it is still executing after the time has elapsed
(the user can call \textit{hasAborted} to see if this has occurred). The query will also be aborted if the consumer's tuple
buffer fills up (the service should be configured with enough overflow buffer
space on disk so that a well-behaved consumer is unlikely to hit this
limit). The user can safely pop tuples from the consumer
following an abort, but they will only get tuples that are already in
the consumer resource's buffer. 

\subsubsection{Starting and stopping queries (service)}\label{sec:ConsumerStarting}

The consumer service starts a query by identifying which producers it should 
contact. For a directed query (\textit{startDirected}), this is specified by 
the user, and the consumer service will simply contact all listed producers and 
merge the results, whether or not that makes sense. Consumers running directed 
queries are not registered in the registry, even for continuous queries.  For a 
mediated query, the consumer service must obtain a 
\textit{query plan} from the mediator. The mediator forms a layer between the 
consumer service and the registry services of all VDBs referenced in the 
consumer's query. A side-effect of this mediator call for continuous consumers 
is to register them so that they will receive notifications from all new 
producers. The query plan is essentially a set of instructions to the consumer 
on which producers to send a \textit{start} message to, and with what query (it 
may also contain instructions to stop streaming from certain producers). The 
consumer service merges the results streamed back from all of the producers 
into the consumer resource's internal tuple buffer, from where they can be 
popped later by the user. The mediator may also attach warnings to the query 
plan regarding potential unreliability of results - these must be added to all 
result sets for the query by the consumer service.

One-time queries automatically stop when the last tuple has been streamed
to the consumer, but they will also stop if the user calls \textit{abort}
or the query runs for longer than the query timeout specified by the user in
the call to \textit{start}. Continuous queries only stop when they are aborted
in one of these two ways. Abort messages are forwarded on to the producers (by
calling each producer service's \textit{abort} message) so they can clean up any
system and database resources associated with the query.

\subsubsection{Plan maintenance}\label{sec:ConsumerPlanMaintenance}

Since starting a query that involves multiple producers is not an atomic
process, some calls to the producers may start while others fail. All types
of queries must adapt to this. Continuous queries must also adapt to
producers failing, or producers being added or removed during the lifetime of
their long-running query. In all cases, the consumer service must request an
updated query plan from the mediator and must start or stop streaming from
producers as directed by the new plan. The various cases are explained here.

In the case of a failure of a \textit{start} call to a producer, the consumer
service must call the mediator to get a new plan that excludes the
broken producer.

The consumer service also detects producers that worked initially but have
subseqently failed, by periodically checking that they have received either
a tuple from each producer, or a plan update (see below) containing that
producer. If neither has happened since the last check, the consumer service
sends a \textit{ping} to the producer and if the service cannot be contacted
or returns an UnknownResourceException, the consumer requests a new plan from
the mediator. The mediator may attach a warning to the query plan if it can't be
certain that a broken producer is no longer relevant.

Continuous queries are registered in all relevant registries so that the consumer can be
notified when new producers are added to the registry. The notification comes
in the form of an \textit{addProducer} message from the producer itself. When
the consumer service receives an \textit{addProducer}, it must contact the
mediator and start or stop streaming from producers as instructed.

Registry replication means that a consumer must also actively ensure its plan
remains up to date. It does this as a side-effect of updating its registration
via the mediator\footnote{This polling is less efficient than
relying on the registry to notify consumers of changes, but is more robust.}
This operation will query the registries for any new producers, and has the side
effect of keeping the consumer registered. As in the other mediation calls, the
consumer must act on the instructions of the new plan.

\subsubsection{Streaming}\label{sec:ConsumerStreaming}\index{streaming}

Tuples are pushed by producers into a \textit{streaming
server}\index{streaming server} in the consumer service that then
delivers them to the tuple buffers of the (local) consumer resources
for which they are intended. Note that tuples are not streamed all the
way to the user: the user must still ultimately pull them from the
consumer by calling \textit{pop}. The streaming server is an
implementation-defined server application listening on a well-defined
TCP socket, the details of which are passed to each producer in the
call to \textit{start}. Tuples are returned in
\textit{chunks}\index{chunk}, each preceded by a 32-bit network byte
ordered integer containing the target consumer's resource identifier,
and terminated by an end-of-chunk flag (a single byte with the value
1). The consumer service sets a limit on the maximum number of tuples
in a chunk, in the call to \textit{start}.  Producers will continue to
send chunks for continuous queries until the query is aborted, but a
one-time query will terminate after the last chunk, so this is
indicated by an end-of-results flag (a single byte with the value 2)
after the end-of-chunk flag of the last chunk for the one-time
query. Chunks are encoded as XML\index{XML} as defined in
\ref{sec:TupleXML}.

If a consumer's tuple buffer is full and a new chunk is received for it by
the streaming server, the query will be aborted and the connection closed.
If a chunk is received for a consumer that
does not exist, the streaming server will notify the producer\footnote{Whether
this is done in an immediate response or asynchronously remains to be decided.}.
An encrypted connection is used for streaming (see
chapter \ref{sec:Security}).
